# Broker 源码分析

作者:Ethan.Yang
博客:https://blog.ethanyang.cn (opens new window)


Broker模块涉及到的内容非常多,以下技术点作为本篇文章的重点讲解:

1、Broker启动流程分析

2、消息存储设计

3、消息写入流程

4、亮点分析:NRS与NRC的功能号设计

5、亮点分析:同步双写数倍性能提升的CompletableFuture

6、亮点分析:Commitlog写入时使用可重入锁还是自旋锁?

7、亮点分析:零拷贝技术之MMAP提升文件读写性能

8、亮点分析:堆外内存机制

# 1. Broker 启动流程分析

Broker 是 RocketMQ 中处理最密集的模块,因此启动流程理解非常关键。

核心启动流程如下:

public class BrokerStartup {
    public static Properties properties = null;
    public static CommandLine commandLine = null;
    public static String configFile = null;
    public static InternalLogger log;

    public static void main(String[] args) {
        //todo  有两步,一个 create, 一个start
        start(createBrokerController(args));
    }
}   

public static BrokerController createBrokerController(String[] args) {
    // .... 省略
    final BrokerController controller = new BrokerController(
                brokerConfig,
                nettyServerConfig,
                nettyClientConfig,
                messageStoreConfig);
            // remember all configs to prevent discard
            controller.getConfiguration().registerConfig(properties);
            //初始化
            boolean initResult = controller.initialize();
        // .... 省略
}    

public boolean initialize() throws CloneNotSupportedException {
        //todo 加载Broker中的主题信息  json
        boolean result = this.topicConfigManager.load();
        //todo 加载消费进度
        result = result && this.consumerOffsetManager.load();
        //todo 加载订阅信息
        result = result && this.subscriptionGroupManager.load();
        //todo 加载订消费者过滤信息
        result = result && this.consumerFilterManager.load();
        // .... 省略
}


public static BrokerController start(BrokerController controller) {
// .... 省略
      controller.start();
    // .... 省略
}


public void start() throws Exception {
    //启动消息存储组件
        if (this.messageStore != null) {
            this.messageStore.start();
        }
        //启动netty服务器
        if (this.remotingServer != null) {
            this.remotingServer.start();
        }
    // .... 省略
    // broker每隔30s向NameServer发送心跳包
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    //todo 向NameServer发送心跳包
                    BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72

# 2. 消息存储设计

Kafka 的文件布局以 Topic/Partition 为基本单位。每个分区对应一个独立目录,消息在分区内顺序写入。随着 Topic 和 Partition 数量增多,Broker 上的分区文件数量随之增长,消息写入会变得更加分散,最终使磁盘 IO 在大量分区中竞争,从高度顺序写退化为“伪随机写”。这会影响整体写入稳定性,尤其是在主题数量和消费者数量非常多的业务场景中。

RocketMQ 的设计完全不同:

  • 所有 Topic 的消息都顺序写入同一个 CommitLog 文件
  • 消费逻辑通过 ConsumeQueue 索引提供
  • 消息写入始终保持严格的顺序写,无论 Topic 有多少

RocketMQ 在多 Topic、多消费者、高并发业务场景下,仍能保持消息一致性、顺序性和写入稳定性。 Kafka 更强调分区级别的吞吐扩展性,但顺序性和一致性容易受到分区策略影响。

# 3. 存储文件设计

RocketMQ 的存储体系由三类核心文件构成:CommitLog、ConsumeQueue、IndexFile。 这三类文件共同支撑了 RocketMQ 的高吞吐顺序写、顺序消费、高效定位等核心能力:

  • CommitLog:消息物理存储(所有消息统一顺序写入)
  • ConsumeQueue:消费索引(按 Topic/Queue 构建固定长度顺序索引)
  • IndexFile:Key 查询索引(按 key 或 uniqKey 快速定位消息)

RocketMQ 通过 “物理日志 + 逻辑队列 + 辅助索引” 的分层设计,实现了:

  • 写入时极致顺序写性能
  • 消费时按 MessageQueue 顺序读取
  • 运维和查询场景下通过 Key/时间快速检索消息

# 3.1 存储目录结构概览

CommitLog 与 ConsumeQueue 的存储结构如下图所示(省略部分):

对应的真实文件目录结构如下:

store/
 ├── commitlog/
 │     ├── 00000000000000000000      // CommitLog 按 1GB 滚动切分,文件名为起始偏移量
 │     ├── 00000001073741824000
 │     └── ...
 ├── consumequeue/
 │     ├── Topic1/
 │     │     ├── 0/                  // Topic1 的 Queue0
 │     │     │     ├── 00000000000000000000   // ConsumeQueue 以 Queue 为粒度拆分
 │     │     │     ├── 00000000000000524288   // 文件名为该文件的起始逻辑 offset * 20B
 │     │     │     └── ...
 │     ├── Topic2/
 │     │     ├── 0/
 │     │     ├── 1/
 │     │     └── ...
 ├── index/
 │     ├── 202404021300.index
 │     └── ...
 └── config/
       ├── topics.json
       ├── subscriptionGroup.json
       ├── delayOffset.json
       ├── consumerOffset.json
       └── ...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

# 3.2 三类文件的作用与定位

# 1) CommitLog

  • RocketMQ 所有 Topic、所有 Queue 的真实消息,全部写入统一的 CommitLog
  • 文件为 顺序追加(Append-Only)
  • 每条消息可变长
  • 1GB 自动切分(由 mappedFileSizeCommitLog 控制)
  • 多个物理文件构成一个逻辑连续的 CommitLog

顺序写磁盘性能远高于随机写(NVMe 顺序写可达数百 MB/s),这也是 RocketMQ 整体高吞吐的基础。

# 2) ConsumeQueue

ConsumeQueue 是 每个 Topic/Queue 的独立逻辑队列,其作用是让消费者无需扫描 CommitLog,而是:

“通过固定长度索引顺序读取消息”。

每条索引记录固定为 20 字节

字段 大小 说明
commitLogOffset 8B 指向 CommitLog 内消息物理位置
msgSize 4B 消息大小(便于跳转)
tagHashCode 8B Tag 过滤加速(Push 模式)

ConsumeQueue 文件结构特点:

  • Queue 粒度拆分文件(Queue0、Queue1…)
  • 每个 ConsumeQueue 文件固定大小约 5MB
  • 写满后自动滚动产生新文件(多个文件构成逻辑队列)
  • 读取时直接 mmap 至内存,顺序扫描性能极高

消费流程一定是通过 ConsumeQueue 索引 → CommitLog,不会跳过 ConsumeQueue。

# 3) IndexFile

IndexFile 是用于 非顺序查询 的可选结构,主要用于:

  • 按 messageKey 或 uniqKey 查询消息
  • 事务消息回查
  • 按时间戳查消息
  • 运维场景(查找 DLQ、恢复消息等)

其结构为典型的 Hash Slot + 冲突链表

  • HashSlot 数组:存储链表入口
  • IndexItems:链式存储 commitLogOffset 与时间戳

IndexFile 提供高效查询能力,但不参与顺序消费。

# 3.3 CommitLog 与 ConsumeQueue 的存储结构

# CommitLog

逻辑上:

  • 一个无限增长的顺序日志文件

物理上:

  • 由多个固定大小(默认 1GB)的 MappedFile 组成
  • 多文件构成一个逻辑连续地址空间
  • 文件名 = 起始偏移量(20 位数字)

示例:

00000000000000000000   // offset = 0
00000001073741824000   // offset = 1GB
00000002147483648000   // offset = 2GB
1
2
3

Broker 统一向末尾 append,新文件按需创建。

# ConsumeQueue

逻辑上:

  • 每个 Topic 下的每个 Queue 是一个独立的逻辑队列
  • 每个 Queue 有自己的 offset 0、offset 1…

物理上:

  • 以 Queue 为粒度生成目录
  • 每个 Queue 目录内存放多个固定大小的 ConsumeQueue 文件(~5MB)
  • 文件名 = 起始 index * 20 字节

示例:

00000000000000000000   // 从逻辑 offset = 0 开始
00000000000000524288   // 从逻辑 offset = 262144 开始
1
2

ConsumeQueue 的“固定长度 + 多文件分块”模式,保证了:

  • O(1) 的消息定位能力
  • 顺序消费的高性能
  • 快速恢复与快速删除(文件分段)
  • mmap 友好,减少 PageCache 压力

# 3.4 消息存储结构源码对应

public boolean initialize() throws CloneNotSupportedException {
        //todo 加载Broker中的主题信息  json
        boolean result = this.topicConfigManager.load();
        //todo 加载消费进度
        result = result && this.consumerOffsetManager.load();
        //todo 加载订阅信息
        result = result && this.subscriptionGroupManager.load();
        //todo 加载订消费者过滤信息
        result = result && this.consumerFilterManager.load();
    .... 省略
        result = result && this.messageStore.load();

}       


public boolean load() {
        boolean result = true;

        try {
            boolean lastExitOK = !this.isTempFileExist();
            log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");

            if (null != scheduleMessageService) {
                result = result && this.scheduleMessageService.load();
            }

            // load Commit Log(零拷贝技术)
            result = result && this.commitLog.load();

            // load Consume Queue(零拷贝技术)
            result = result && this.loadConsumeQueue();
           // 省略 ...
        }            
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

# 4 消息写入流程

Producer 发送消息到 Broker 后,Broker 内部会经过 网络层 → 存储逻辑层 → 存储 I/O 层 三个阶段,最终把消息写入 CommitLog,并由后台线程异步构建 ConsumeQueue。

整体流程如下图所示(逻辑层视角):

Netty → SendMessageProcessor → DefaultMessageStore → CommitLog → MappedFile(page cache) → Flush → ReputMessageService(构建 CQ / Index)

下面分层介绍整条写入链路。

# 4.1 网络层:SendMessageProcessor 接收并处理请求

Broker 使用 Netty 接收客户端请求,发送消息的请求交给 SendMessageProcessor.processRequest 处理:

public RemotingCommand processRequest(ChannelHandlerContext ctx,
                                      RemotingCommand request) {
    return asyncProcessRequest(ctx, request).get();
}
1
2
3
4

这里虽然使用的是 asyncProcessRequest,但最终调用 .get() 阻塞等待结果,是 同步返回。 Future 只是统一接口风格,方便后续扩展真正异步处理。

# 4.2 存储入口:DefaultMessageStore.asyncPutMessage

DefaultMessageStore 是存储层统一入口,负责:

  • 检查 Broker 是否可写
  • 检查消息是否合法
  • 分发到 CommitLog 执行真正写入

核心逻辑:

public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
    // 1. Broker 存储状态检查
    checkStoreStatus();
    // 2. 消息本身合法性检查
    checkMessage(msg);
    // 3. 进入 CommitLog 执行消息写入
    return commitLog.asyncPutMessage(msg);
}
1
2
3
4
5
6
7
8

# 4.3 存储 I/O 层:CommitLog 写入消息

CommitLog.asyncPutMessage 是写入消息的核心过程,它承担了:

  • 设置存储时间、CRC 校验
  • 延时消息处理
  • 获取或创建 CommitLog 文件
  • 加锁保证顺序写
  • 写入 MappedFile 内存(page cache)
  • 调用刷盘/复制服务

核心代码片段:

public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
    msg.setStoreTimestamp(System.currentTimeMillis());
    msg.setBodyCRC(UtilAll.crc32(msg.getBody()));

    // 延时消息 topic 变化处理
    if (msg.getDelayTimeLevel() > 0) {
        msg.setTopic(SCHEDULE_TOPIC);
        msg.setQueueId(delayLevel2QueueId(msg.getDelayTimeLevel()));
        ...
    }

    // 获取最新的 MappedFile
    MappedFile mappedFile = mappedFileQueue.getLastMappedFile();

    putMessageLock.lock();
    try {
        // 文件满了则创建新文件
        if (mappedFile == null || mappedFile.isFull()) {
            mappedFile = mappedFileQueue.getLastMappedFile(0);
        }

        // 写入 PageCache(并没有立即刷盘)
        AppendMessageResult result = mappedFile.appendMessage(msg, appendMessageCallback);
        // ... 调用刷盘服务、复制服务
    } finally {
        putMessageLock.unlock();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

# 关键点:CommitLog 写入只是“写 page cache”,不等于磁盘落盘

真正的刷盘由专门的 Flush 服务完成(同步/异步刷盘),并由复制服务推动主从复制。


# 4.4 异步生成 ConsumeQueue:ReputMessageService

连续发送5条消息,消息是不定长,首先所有信息先放入 Commitlog中,每一条消息放入 Commitlog 的时候都需要上锁,确保顺序的写入。

当Commitlog写成功了之后。数据通过ReputMessageService类定时同步到ConsumeQueue中,写入Consume Queue的内容是定长的,固定是20个Bytes(offset 8个、size 4个、Hashcode of Tag 8个)。

这种设计非常的巧妙:

查找消息的时候,可以直按根据队列的消息序号,计算出索引的全局位置(比如序号2,就知道偏移量是20),然后直接读取这条索引,再根据索引中记录的消息的全局位置,找到消息。这两次查找是差不多的:第一次在通过序号在consumer Queue中获取数据的时间复杂度是O(1),第二次查找commitlog文件的时间复杂度也是O(1),所以消费时查找数据的时间复杂度也是O(1)。

RocketMQ 的设计是:

CommitLog 是消息的唯一存储。 ConsumeQueue / Index 是由独立线程从 CommitLog 回放(Reput)得到的“派生结构”。

后台线程:ReputMessageService

class ReputMessageService extends ServiceThread {
    @Override
    public void run() {
        while (!isStopped()) {
            doReput();   // 不断重放 CommitLog
        }
    }
}
1
2
3
4
5
6
7
8

# doReput 的核心逻辑:

private void doReput() {
    while (reputFromOffset < commitLog.getMaxOffset()) {
        // 1. 从 CommitLog 读取一条消息
        SelectMappedBufferResult data = commitLog.getData(reputFromOffset);

        // 2. 解析为 DispatchRequest
        DispatchRequest req = messageDecoder.decode(data);

        // 3. 分发给多个 Dispatcher
        doDispatch(req);   // ConsumeQueue / Index 均在此构建

        // 4. 更新偏移
        reputFromOffset += req.getMsgSize();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

其中:

  • CommitLogDispatcherBuildConsumeQueue:负责写入 ConsumeQueue
  • CommitLogDispatcherBuildIndex:负责构建 IndexFile

# 总结一句话:

CommitLog 写入成功后,并不会立即生成 ConsumeQueue,而是由 ReputMessageService 后台线程异步构建,通常有几十毫秒到几百毫秒的延迟(dispatch lag)。

# 4.5 RocketMQ 写入流程总结图

下面是简化版完整链路:

Producer
   ↓(发送消息)
Netty Server
   ↓(解析请求)
SendMessageProcessor
   ↓
DefaultMessageStore.asyncPutMessage
   ↓
CommitLog.asyncPutMessage
   ↓(加锁顺序写入)
MappedFile.appendMessage   ← 写入 page cache(未刷盘)
   ↓
(FlushService 异步/同步刷盘)
   ↓
(HAService 复制到 Slave)

【后台异步】
ReputMessageService
   ↓(从 CommitLog 回放消息)
CommitLogDispatcherBuildConsumeQueue
   ↓
更新 ConsumeQueue(逻辑队列)

CommitLogDispatcherBuildIndex
   ↓
更新 IndexFile
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

整个存储体系就三层:

  1. 业务层(网络层):负责接收和分发请求
  2. 存储逻辑层:DefaultMessageStore、CommitLog、ConsumeQueue 等
  3. 存储 I/O 层:MappedFile(mmap)、Flush、HA

# 4.6 为什么 RocketMQ 要异步生成 ConsumeQueue?

这是 RocketMQ 高性能的核心原因:

方案 消息写入延迟 吞吐量
写 CommitLog + 同步生成 CQ
写 CommitLog + 异步生成 CQ(RocketMQ)

原因:

  • CommitLog 是顺序写,非常快
  • ConsumeQueue 写入比较随机、开销更大
  • 解耦后,消息写入路径最短、延迟最低
  • 消费进度和实际消息读取最终一致即可(CQ 可以重建)

整个存储设计层次非常清晰,大致的层次如下图:

image.png

# 5 源码分析中亮点

# 5.1 同步双写数倍性能提升的CompletableFuture

在RocketMQ4.7.0之后,RocketMQ大量使用Java中的异步编程接口CompletableFuture。尤其是在Broker端进行消息接收处理时。

比如:DefaultMessageStore类中asyncPutMessage方法

public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
        PutMessageStatus checkStoreStatus = this.checkStoreStatus();
        if (checkStoreStatus != PutMessageStatus.PUT_OK) {
            return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));
        }

        PutMessageStatus msgCheckStatus = this.checkMessage(msg);
        if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
            return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
        }

        long beginTime = this.getSystemClock().now();
        //这里会进入commitLog的消息处理逻辑
        CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
1
2
3
4
5
6
7
8
9
10
11
12
13
14

Future接口正是设计模式中Future模式的一种实现:如果一个请求或任务比较耗时,可以将方法调用改为异步,方法立即返回,任务则使用主线程外的其他线程异步执行,主线程继续执行。当需要获取计算结果时,再去获取数据。

在Master-Slave主从架构下,Master 节点与 Slave 节点之间数据同步/复制的方式有同步双写和异步复制两种模式。同步双写是指Master将消息成功落盘后,需要等待Slave节点复制成功(如果有多个Slave,成功复制一个就可以)后,再告诉客户端消息发送成功。

image.png

RocketMQ 4.7.0 以后合理使用CompletableFuture对同步双写进行性能优化,使得对消息的处理流式化,大大提高了Broker的接收消息的处理能力。

# ⭐5.2 Commitlog 写入时使用可重入锁还是自旋锁?

RocketMQ 在写入 CommitLog 时会使用互斥锁保证“同一时刻只有一个线程写 CommitLog”。 为了兼容不同的刷盘策略(同步刷盘 / 异步刷盘),RocketMQ 提供了两种可选锁:

  • 自旋锁(SpinLock)
  • 可重入锁(ReentrantLock)

源码如下:

 //todo putMessgae会有多个线程并行处理,需要上锁
        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
        try {
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            this.beginTimeInLock = beginLockTimestamp;
            ... 省略 ....
            elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
            beginTimeInLock = 0;

        } finally {
            putMessageLock.unlock();//解锁:标准的lock锁的方式
        }
1
2
3
4
5
6
7
8
9
10
11
12

RocketMQ 允许通过配置项动态切换:

useReentrantLockWhenPutMessage=false(默认使用自旋锁)
1

RocketMQ 官方文档优化建议:异步刷盘建议使用自旋锁,同步刷盘建议使用重入锁

//useReentrantLockWhenPutMessage参数默认是false,使用自旋锁。异步刷盘建议使用自旋锁,同步刷盘建议使用重入锁
        this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ?
                new PutMessageReentrantLock() : new PutMessageSpinLock();
1
2
3

同步刷盘时,锁竞争激烈,会有较多的线程处于等待阻塞等待锁的状态,如果采用自旋锁会浪费很多的CPU时间,所以“同步刷盘建议使用重入锁”。

异步刷盘是间隔一定的时间刷一次盘,锁竞争不激烈,不会存在大量阻塞等待锁的线程,偶尔锁等待就自旋等待一下很短的时间,不要进行上下文切换了,所以采用自旋锁更合适。

# 5.3 零拷贝技术之MMAP提升文件读写性能

RocketMQ 对 CommitLog、ConsumeQueue 等文件的读写都采用了 MMAP(内存映射) 技术。 底层实现依赖 JDK NIO 的 MappedByteBuffer.map() 方法,将磁盘文件直接映射到用户空间。

# 1. 为什么需要 MMAP?(解决“多次拷贝”问题)

如果不使用 MMAP,传统的文件 I/O 流程如下:

磁盘 → 内核页缓存(Kernel Buffer) → 用户空间(User Buffer)
1

整个过程需要 两次数据拷贝

  1. 磁盘 → 内核缓冲区
  2. 内核缓冲区 → 用户空间

这会消耗:

  • CPU 拷贝开销
  • 内存带宽
  • 系统调用开销(read/write)

在高吞吐 MQ 场景下,这种方式成本太高。

# 2. MMAP 如何实现“零拷贝”?

MMAP 会将文件的一段区域直接“映射”为用户空间的一块内存:

磁盘文件 ↔ 映射到用户空间的虚拟地址
1

因此,读取数据时:

  • 不需要从内核缓存复制到用户缓存
  • CPU 不参与中间复制动作
  • 应用可以像使用普通内存一样操作文件内容

实际 I/O 流程变为:

磁盘 → 用户空间映射区(仅一次拷贝)
1

减少一次拷贝,属于 零拷贝技术的一种实现方式

RocketMQ 的 commitlog/consumequeue 写入,即通过直接写映射内存,写入 PageCache,后续刷盘由操作系统负责。

image.png

Broker启动时MMAP相关源码如下:

MappedFile类的init方法

private void init(final String fileName, final int fileSize) throws IOException {
    this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();

    // 核心:将文件映射到内存(mmap)
    this.mappedByteBuffer =
            this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);

    TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
    TOTAL_MAPPED_FILES.incrementAndGet();
}
1
2
3
4
5
6
7
8
9
10

说明:

  • FileChannel.map() 会创建一个 MappedByteBuffer
  • 该 buffer 指向文件在用户空间的映射区域
  • 后续所有写入 CommitLog 的操作(appendMessage)都是对这段内存的直接写入

这意味着:

  • 写 CommitLog 不需要系统调用 write()
  • 写入仅为内存操作(写 PageCache)
  • 刷盘由操作系统的 VM 机制异步完成(或由 RocketMQ 刷盘服务主动调用 force()

# 5.4 堆外内存机制

RocketMQ 在默认情况下通过 MMAP + PageCache 完成 CommitLog 的写入与读取; 除此之外,还提供了一种 堆外内存缓冲机制(TransientStorePool),用于进一步提升写入性能、减少 PageCache 竞争。

TransientStorePool 属于 短暂缓冲池(堆外内存),通过 DirectByteBuffer 直接分配内存,不占用 JVM Heap。

# 5.5.1 开启条件及限制

要启用堆外内存,需要在 broker 配置中开启:

transientStorePoolEnable = true
1

开启该机制的前提条件:

  1. 必须是异步刷盘(ASYNC_FLUSH) 因为堆外内存写入是“两段式写入”,同步刷盘会导致延迟非常大。
  2. Broker 必须为主节点(Master) Slave 节点不支持,因为 Slave 不负责写入 CommitLog。

DefaultMessageStore. DefaultMessageStore()

this.transientStorePool = new TransientStorePool(messageStoreConfig);
if (messageStoreConfig.isTransientStorePoolEnable()) {
    this.transientStorePool.init();  // 初始化堆外内存池
}
1
2
3
4

# 5.5.2 堆外缓冲区流程

开启 TransientStorePool 后,写入 CommitLog 的流程从原来的:

Producer → PageCache(MMAP) →  CommitLog 文件
1

变为“两层架构”:

Producer → DirectByteBuffer(堆外内存)PageCache(MMAP) → CommitLog 文件
1

对应 RocketMQ 中的流程:

  1. 写消息先写入 DirectByteBuffer(堆外缓冲池)
  2. Commit 线程 定期将堆外数据拷贝到 MappedByteBuffer(即 PageCache)
  3. Flush 线程 将 PageCache 刷盘到磁盘文件

示意图如下:

# 5.5.3 为什么要多这一层?

  • DirectByteBuffer 位于 JVM 堆外,不会触发 GC
  • 堆外内存可以通过 mlock 长期锁定在物理内存中,避免 swap
  • 写入时不会直接污染 PageCache,使读写解耦,降低锁竞争